MapTask 工作机制
MR 核心阶段:map,sort,copy,sort,reduce
MapTask 分为两个阶段,分别是 map 和 sort。在执行 context.write(keyout-valueout)
之前都属于 map 阶段,然后如果该 job 有 ReduceTask,那么在进行 sort 排序。
在执行 context.write()
并不是直接将 key-value 写出,而是先攒到一个缓存区 MapOutPutBuffer
中。每个记录在进入缓冲区时,先调用 Partitioner
(分区器)为记录计算一个区号。例如以单词统计为例,数据进入缓冲区后格式可能如下,index 为索引,partition为分区号,keystart,valuestart 分别表示 key 的起始位置和 value 的起始位置,key 为统计的单词,value 单词出现的个数。
1 2 3 4
| index partition keystart valuestart key value 0 1 0 6 hadoop 1 1 1 7 11 hive 1 2 0 xx xx spark 1
|
缓存区有两个线程,一个为收集线程,收集线程负责将 Mapper 写出的 key-value 收集到缓冲区。第二个为溢写线程,溢写线程会在缓冲区已经收集了 80% 空间的数据时【缓冲区大小默认为 100M,80% 即 80M】,被唤醒,唤醒后负责将缓冲区收集的数据溢写到磁盘。一旦缓冲区满足溢写条件,先对缓冲区的所有数据,进行一次排序,利用快速排序算法对缓存区内的数据进行排序,排序方式是,先按照分区编号 Partition 进行排序,然后按照 key进行排序。这样,经过排序后,数据以分区为单位聚集在一起,且同一分区内所有数据按照 key 有序。排序时,只排索引(记录有序的索引的顺序),不移动数据。达到溢写条件后,按照分区,进行溢写,每次溢写生成一个临时文件 spillx.out【x为分区号】。溢写多次,生成多个临时文件。当所有的数据全部被溢写结束,最后一批数据不满足溢写条件会执行一次 flush。
写结束后,会对所有的溢写片段执行一次 merge (将多个临时文件合并成一个最终结果)操作。合并时,将所有临时文件同一个分区的数据进行汇总,汇总后再排序【归并排序】,最后合并为一个文件,这个文件每个分区中的 key-value 都是有序的。
ReduceTask 工作机制
- Copy 阶段:ReduceTask 从各个MapTask 上远程拷贝一片数据,并针对某一片数据,如果其大小超过一定阈值,则写到磁盘上,否则直接放到内存中。
- Merge 阶段:在远程拷贝数据的同时,ReduceTask启动了两个后台线程对内存和磁盘上的文件进行合并,以防止内存使用过多或磁盘上文件过多。
- Sort 阶段:按照 MapReduce 语义,用户编写 reduce() 函数输入数据是按 key 进行聚集的一组数据。为了将key 相同的数据聚在一起,Hadoop 采用了基于排序的策略。由于各个 MapTask 已经实现对自己的处理结果进行了局部排序,因此,ReduceTask只需对所有数据进行一次归并排序即可。
- Reduce阶段:reduce() 函数将计算结果写到HDFS上。
Partition分区
默认的分区策略是根据 key 的 hashCode 对 ResultTasks 个数取模得到的。可以从配置中 mapreduce.job.partitioner.class
参数来设置分区器,如果没有设置,就使用 HashPartitioner
作为分区器。分区的数量和 ReduceTask 的数量一致,一个 ReduceTask 对应着一个分区,所以如果想设置多个分区,那么就需要设置 ReduceTask 的数量。
分区案例
将统计结果按照手机归属地不同省份输出到不同文件中(分区)。期望输出数据,手机号136、137、138、139开头都分别放到一个独立的4个文件中,其他开头的放到一个文件中。
PartitionerFlowBeanDriver
完整代码如下,需要在里面设置 ReduceTask 的数量和自定义使用的分区器。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67
| package com.yanrs.mr.Partitioner;
import com.yanrs.mr.flowbean.FlowBean; import com.yanrs.mr.flowbean.FlowBeanMapper; import com.yanrs.mr.flowbean.FlowBeanReducer; import com.yanrs.mr.wordcount.WCMapper; import com.yanrs.mr.wordcount.WCReducer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException; import java.net.URISyntaxException;
public class PartitionerFlowBeanDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException { Configuration conf = new Configuration(); conf.set("fs.defaultFS", "hdfs://hadoop10:9000");
FileSystem fileSystem = FileSystem.get(conf);
Path inputPath = new Path("/mrinput/flowbean"); Path outPath = new Path("/mroutput/partition"); if(fileSystem.exists(outPath)){ fileSystem.delete(outPath, true); }
Job job = Job.getInstance(conf);
job.setNumReduceTasks(5); job.setPartitionerClass(MyPatitioner.class);
job.setJobName("PartitionerFlowBean");
job.setMapperClass(PartitionerFlowBeanMapper.class); job.setReducerClass(PartitionerFlowBeanReducer.class);
job.setOutputKeyClass(Text.class); job.setOutputValueClass(PartitionerFlowBean.class);
FileInputFormat.setInputPaths(job, inputPath); FileOutputFormat.setOutputPath(job, outPath);
job.waitForCompletion(true); } }
|
MyPatitioner
完整代码如下,接收到每个 key 之后,根据需求将不同的 key 划分到不同的 Partitioner 中。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| package com.yanrs.mr.Partitioner;
import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Partitioner;
public class MyPatitioner extends Partitioner<Text, PartitionerFlowBean> {
@Override public int getPartition(Text text, PartitionerFlowBean partitionerFlowBean, int numPartitions) { int partitionNum = 0; String suffix = text.toString().substring(0, 3); switch(suffix){ case "136": partitionNum = 1; break; case "137": partitionNum = 2; break; case "138": partitionNum = 3; break; case "139": partitionNum = 4; break; } return partitionNum; } }
|
运行之后可以看到结果文件为 5 份,136 开头的号码在 part-r-00001 中,137 开头的号码在 part-r-00002 中,138 开头的号码在 part-r-00003 中,139 开头的号码在 part-r-00004 中,其余的在 part-r-00000 中。
完整代码
排序
自定义比较器的两种方法
1. 定义 Mappper 输出的key,让 key 实现 WritableComparable, 实现 CompareTo()
2. 自定义类时,继承 WriableComparator 或实现 RawCompartor,使用时设置 mapreduce.job.output.key.comparator.class=自定义的类
排序案例
Mapper 输出 key 为内置类型
对消耗的总流量进行升序排序。在之前 flowbean 案例的结果的基础上,对用户手机号所消耗的总流量升序排序。之前 flowbean 结果如下。
1 2 3 4 5 6
| 13470253144 FlowBean{upFlow=180, downFlow=180, sumFlow=360} 13509468723 FlowBean{upFlow=7335, downFlow=110349, sumFlow=117684} 13560439638 FlowBean{upFlow=918, downFlow=4938, sumFlow=5856} 13568436656 FlowBean{upFlow=3597, downFlow=25635, sumFlow=29232} 13590439668 FlowBean{upFlow=1116, downFlow=954, sumFlow=2070} ...
|
因为之前案例手机号是 key ,所以输出结果默认是按照手机号 key 进行排序的。即需要注意的一点是,排序只针对 key 进行 什么是 key,那么就对这个字段进行排序。
现在的需求的是根据总流量排序,所以要将总流量做为 key 。上述结果文件总流量可以通过 =
拆分,然后去除 }
获取到。Sort1Mapper 代码如下,这里需要注意的是 mapper 的输出,因为排序只针对 mapper 输出的 key 排序,所以这里 key 是总流量的大小,即类型为 LongWritable【内置类型】,value 为手机号。还需要注意如何拆分字符串能获取到总流量和手机号。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| package com.yanrs.mr.sort1;
import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class Sort1Mapper extends Mapper<LongWritable, Text, LongWritable, Text>{ private LongWritable outKey = new LongWritable(); private Text outValue = new Text();
@Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] words = value.toString().split("="); outKey.set(Long.parseLong(words[3].replace("}", ""))); outValue.set(words[0].split("\t")[0]); context.write(outKey, outValue); } }
|
Sort1Reducer 中需要注意的是,将 mapper 的输出结果进行顺序对换,即 reducer 的输出 key 为手机号,输出 value 为排序好的总流量。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| package com.yanrs.mr.sort1;
import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class Sort1Reducer extends Reducer <LongWritable, Text, Text, LongWritable>{ @Override protected void reduce(LongWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException { for (Text value: values) { context.write(value, key); } } }
|
Sort1Driver 中需要注意设置 mapper 的输出类型和 reducer 的输出类型,因为现在两者不一致了,所以需要单独设置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63
| package com.yanrs.mr.sort1;
import com.yanrs.mr.flowbean.FlowBean; import com.yanrs.mr.flowbean.FlowBeanMapper; import com.yanrs.mr.flowbean.FlowBeanReducer; import com.yanrs.mr.wordcount.WCMapper; import com.yanrs.mr.wordcount.WCReducer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException; import java.net.URISyntaxException;
public class Sort1Driver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException { Configuration conf = new Configuration(); conf.set("fs.defaultFS", "hdfs://hadoop10:9000");
FileSystem fileSystem = FileSystem.get(conf);
Path inputPath = new Path("/mroutput/flowbean/part-r-00000"); Path outPath = new Path("/mroutput/flowbean/sort1"); if(fileSystem.exists(outPath)){ fileSystem.delete(outPath, true); }
Job job = Job.getInstance(conf);
job.setJobName("sort1");
job.setMapperClass(Sort1Mapper.class); job.setReducerClass(Sort1Reducer.class);
job.setMapOutputKeyClass(LongWritable.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class);
FileInputFormat.setInputPaths(job, inputPath); FileOutputFormat.setOutputPath(job, outPath);
job.waitForCompletion(true); } }
|
运行输出结果
1 2 3 4 5 6 7 8
| 13966251146 240 13729199489 240 13768778790 240 13846544121 264 13470253144 360 13956435636 1644 13590439668 2070 15959002129 2118
|
示例代码
继承 WriableComparator
对消耗的总流量进行降序排序。上面例子 对消耗的总流量进行升序排序 的例子中,因为 mapper 的输出是总流量,类型为 LongWritable,而且 LongWritable 实现了 WritableComparable 接口,并且有 CompareTo 方法,而且 CompareTo 方法是按照升序排序的,所以我在上述例子中使用的就是 LongWritable 实现的比较器,得到的是升序排序的结果。
在对消耗的总流量进行降序排序的例子中,我们需要自己实现一个比较器,实现比较器有两种方式,这里采用自定义类继承 WriableComparator,使用时在 Driver 中设置自定义的比较器即可。
Sort2Mapper
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| package com.yanrs.mr.sort2;
import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class Sort2Mapper extends Mapper<LongWritable, Text, LongWritable, Text>{ private LongWritable outKey = new LongWritable(); private Text outValue = new Text();
@Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] words = value.toString().split("="); outKey.set(Long.parseLong(words[3].replace("}", ""))); outValue.set(words[0].split("\t")[0]); context.write(outKey, outValue); } }
|
Sort2Mapper
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| package com.yanrs.mr.sort2;
import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class Sort2Mapper extends Mapper<LongWritable, Text, LongWritable, Text>{ private LongWritable outKey = new LongWritable(); private Text outValue = new Text();
@Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] words = value.toString().split("="); outKey.set(Long.parseLong(words[3].replace("}", ""))); outValue.set(words[0].split("\t")[0]); context.write(outKey, outValue); } }
|
Sort2Driver
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60
| package com.yanrs.mr.sort2;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException; import java.net.URISyntaxException;
public class Sort2Driver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException { Configuration conf = new Configuration(); conf.set("fs.defaultFS", "hdfs://hadoop10:9000");
FileSystem fileSystem = FileSystem.get(conf);
Path inputPath = new Path("/mroutput/flowbean/part-r-00000"); Path outPath = new Path("/mroutput/flowbean/sort2"); if(fileSystem.exists(outPath)){ fileSystem.delete(outPath, true); }
Job job = Job.getInstance(conf);
job.setJobName("sort1");
job.setSortComparatorClass(MyDescComparator.class);
job.setMapperClass(Sort2Mapper.class); job.setReducerClass(Sort2Reducer.class);
job.setMapOutputKeyClass(LongWritable.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class);
FileInputFormat.setInputPaths(job, inputPath); FileOutputFormat.setOutputPath(job, outPath);
job.waitForCompletion(true); } }
|
MyDescComparator
1 2 3 4 5 6 7 8 9 10 11 12 13
| package com.yanrs.mr.sort2;
import org.apache.hadoop.io.WritableComparator;
public class MyDescComparator extends WritableComparator { @Override public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { long thisValue = readLong(b1, s1); long thatValue = readLong(b2, s2); return thisValue < thatValue ? 1 : (thisValue == thatValue ? 0 : -1); } }
|
运行输出结果
1 2 3 4 5 6
| 13509468723 117684 13975057813 59301 13568436656 29232 13736230513 27162 15043685818 7197 .....
|
示例代码
Mapper 输出 key 为自定义类型
对消耗的总流量进行降序排序。自定义 key 的时候需要实现 WritableComparable 接口,而不是以前的 Writable 接口。实现 WritableComparable 后重写 compareTo 方法,在里面实现要比较的逻辑即可。
FlowBeanSort3Mapper
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| package com.yanrs.mr.sort3;
import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class FlowBeanSort3Mapper extends Mapper<LongWritable, Text, FlowBeanSort3, Text>{ private Text outValue = new Text(); private FlowBeanSort3 outKey = new FlowBeanSort3();
@Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] words = value.toString().split("\t"); outKey.setUpFlow(Long.parseLong(words[words.length - 3])); outKey.setDownFlow(Long.parseLong(words[words.length - 2])); outKey.setSumFlow(Long.parseLong(words[words.length - 2]) + Long.parseLong(words[words.length - 3])); outValue.set(words[1]); context.write(outKey, outValue); } }
|
FlowBeanSort3Reducer
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| package com.yanrs.mr.sort3;
import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class FlowBeanSort3Reducer extends Reducer <FlowBeanSort3, Text, Text, FlowBeanSort3>{ @Override protected void reduce(FlowBeanSort3 key, Iterable<Text> values, Context context) throws IOException, InterruptedException { for (Text value:values) { context.write(value, key); } } }
|
FlowBeanSort3Driver
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54
| package com.yanrs.mr.sort3;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class FlowBeanSort3Driver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); conf.set("fs.defaultFS", "hdfs://hadoop10:9000");
FileSystem fileSystem = FileSystem.get(conf);
Path inputPath = new Path("/mrinput/flowbean"); Path outPath = new Path("/mroutput/flowbean/sort3"); if(fileSystem.exists(outPath)){ fileSystem.delete(outPath, true); }
Job job = Job.getInstance(conf);
job.setJobName("sort3");
job.setMapperClass(FlowBeanSort3Mapper.class); job.setReducerClass(FlowBeanSort3Reducer.class);
job.setMapOutputKeyClass(FlowBeanSort3.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(FlowBeanSort3.class);
FileInputFormat.setInputPaths(job, inputPath); FileOutputFormat.setOutputPath(job, outPath);
job.waitForCompletion(true); } }
|
FlowBeanSort3
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80
| package com.yanrs.mr.sort3;
import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput; import java.io.DataOutput; import java.io.IOException;
public class FlowBeanSort3 implements WritableComparable<FlowBeanSort3> { private long upFlow; private long downFlow; private long sumFlow;
@Override public void write(DataOutput dataOutput) throws IOException { dataOutput.writeLong(upFlow); dataOutput.writeLong(downFlow); dataOutput.writeLong(sumFlow); }
@Override public void readFields(DataInput dataInput) throws IOException { upFlow = dataInput.readLong(); downFlow = dataInput.readLong(); sumFlow = dataInput.readLong(); }
public long getUpFlow() { return upFlow; }
public void setUpFlow(long upFlow) { this.upFlow = upFlow; }
public long getDownFlow() { return downFlow; }
public void setDownFlow(long downFlow) { this.downFlow = downFlow; }
public long getSumFlow() { return sumFlow; }
public void setSumFlow(long sumFlow) { this.sumFlow = sumFlow; }
public FlowBeanSort3() { }
@Override public String toString() { return "PartitionerFlowBean{" + "upFlow=" + upFlow + ", downFlow=" + downFlow + ", sumFlow=" + sumFlow + '}'; }
@Override public int compareTo(FlowBeanSort3 flowBeanSort3) { return this.sumFlow -flowBeanSort3.getSumFlow() > 0?-1:1; } }
|
示例代码
实现 RawCompartor 接口
对消耗的总流量进行降序排序。实现 RawCompartor 接口后会重写两个 compare 方法,在一个方法中获取比较的对象,另一个方法中进行比较。
MyDescRawCompartor
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| package com.yanrs.mr.sort4;
import org.apache.hadoop.io.DataInputBuffer; import org.apache.hadoop.io.RawComparator;
import java.io.IOException;
public class MyDescRawCompartor implements RawComparator<FlowBeanSort4> { private FlowBeanSort4 key1 = new FlowBeanSort4(); private FlowBeanSort4 key2 = new FlowBeanSort4(); private DataInputBuffer buffer = new DataInputBuffer(); @Override public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { try { this.buffer.reset(b1, s1, l1); this.key1.readFields(this.buffer); this.buffer.reset(b2, s2, l2); this.key2.readFields(this.buffer); this.buffer.reset((byte[])null, 0, 0); } catch (IOException var8) { throw new RuntimeException(var8); }
return this.compare(this.key1, this.key2); }
@Override public int compare(FlowBeanSort4 o1, FlowBeanSort4 o2) { return o1.getSumFlow() - o2.getSumFlow() > 0?-1:1; } }
|
示例代码
Combiner
Combiner 实际上本质是一个 Reducer 类,Conbiner 只有在设置了之后,才会运行。combiner 的作用是在shuffle 阶段对相同 key 的 key-value 进行提前合并,以便在传输中可以减少磁盘 IO 和网络 IO。
Combiner 和 Reducer 的区别
Reducer 是在 reduce 阶段调用,Combiner 是在 shuffle 阶段调用【既有可能在 MapTask 端,也可能在ReduceTask 端】。但本质都是 Reducer 类,作用都是对有相同 key 的 key-value 进行合并。
Combiner 使用条件
Combiner 用在+,- 操作的场景,不能用在 *,/ 操作的场景。使用 Combiner 必须保证不能影响处理逻辑和结果。
使用时在 Driver 中设置Combiner 类即可 job.setCombinerClass(Reducer 类.class);
调用时机
MapTask 端调用:
- 每次溢写前会调用 Combiner 对溢写的数据进行局部合并。
- 在merge时如果溢写的分区数 >=3,如果设置了 Combiner,Combiner 会再次对数据进行 Combine。
ReduceTask 端调用:
- shuffle 线程拷贝多个 MapTask 同一分区的数据,拷贝后执行 merge 和 sort, 如果数据量过大,需要将部分数据先合并排序后,溢写到磁盘。如果设置了Combiner,Combiner 会再次运行。
案例
在之前的 flowbean 案例上,只需要在 Driver 中添加配置 job.setCombinerClass(FlowBeanReducer.class);
即可。
未添加 combine 之前结果如下:
添加 combine 之后结果如下
分组
分组通过分组比较器,对进入reduce的key进行对比,key相同的分为一组,一次性进入Reducer,被调用reduce方法。
自定义分组比较器
用户可以自定义 key 的分组比较器,自定义的比较器必须是一个 RawComparator类型的类然后实现compareTo()方法。如果没有设置 key 的分组比较器,默认采取在 Map 阶段排序时,key 的比较器。
分组案例
样例数据如上所示,现在需要求出每一个订单中最贵的商品。思路:将订单数据分装为 bean 对象,然后将 bean 做为 mapper 的输出 key,并让 bean 实现 WritableComparable 接口,重写 compareTo 方法,compareTo 方法中先对 订单 id 进行排序,若订单 id 相同再对成交金额进行排序。这样数据就是按照订单中金额有序排序的了。
总结
分区
总的分区数取决于reduceTask的数量,一个Job要启动几个reduceTask,取决于期望产生几个分区,每个分区最后都会生成一个结果文件。
当 reduceTask>1,尝试获取用户设置的 Partionner,如果没有设置使用内置的 HashPartitoner。如果reduceTask<=1, 系统默认提供一个 Partionner,它会将所有记录都分到0号区。
排序
每次溢写前,使用快速排序最后merge时,使用归并排序
比较器
如果用户自定义了比较器,MR 就使用用户自定义的比较器(RawComparator 类型),如果用户没有自定义,那么Mapper 输出的 Key 需要实现 WriableComparable 接口系统会自动提供比较器。不管是自己提供比较器还是实现WriableComparable 接口,最后在比较时,都是在调用自己实现的CompareTo 方法。
执行流程
- Partitioner计算分区
- 满足溢写条件,对所有数据进行排序,排序时用比较器对比 key,每次溢写前的排序,默认使用的快排。如果设置了 Combiner,在溢写前,排好序的结果会先被 Combiner 进行 combine 再溢写。
- 2过程会发生 N 次
- 所有的溢写片段需要 merge 为一个总的文件,合并时,使用归并排序,对 key 进行排序。如果溢写片段数量超过 3,在溢写成一个最终的文件时,Combiner 再次调用,执行Combine,combine 后再溢写。